package com.fwmagic.dynamic_rule.functions;

import com.fwmagic.dynamic_rule.bean.LogBean;
import com.fwmagic.dynamic_rule.bean.ResultBean;
import com.fwmagic.dynamic_rule.bean.RuleParam;
import com.fwmagic.dynamic_rule.service.QueryRouterServiceV3;
import com.fwmagic.dynamic_rule.service.QueryRouterServiceV4;
import com.fwmagic.dynamic_rule.utils.RuleSimulator;
import com.fwmagic.dynamic_rule.utils.SystemPrintUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

/**
 * 规则处理核心逻辑V4(Flink State && ClickHouse & Cache(redis))
 */
@Slf4j
public class RuleProcessFunctionV4 extends KeyedProcessFunction<String, LogBean, ResultBean> {

    private ListState<LogBean> listState;

    private QueryRouterServiceV4 queryRouterServiceV4;

    @Override
    public void open(Configuration parameters) throws Exception {
        //准备一个底层明细事件的State
        ListStateDescriptor<LogBean> stateDescriptor = new ListStateDescriptor<>("events", LogBean.class);
        //设置State中只存储最近两个小时的数据
        StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.hours(2)).updateTtlOnCreateAndWrite().build();
        stateDescriptor.enableTimeToLive(stateTtlConfig);
        listState = getRuntimeContext().getListState(stateDescriptor);
        //构造一个查询路由器
        queryRouterServiceV4 = new QueryRouterServiceV4(listState);
    }

    @Override
    public void processElement(LogBean logBean, Context context, Collector<ResultBean> collector) throws Exception {
        //将收到的事件放入历史明细state存储中
        listState.add(logBean);

        //获取规则参数
        RuleParam ruleParam = RuleSimulator.getRuleParam();

        //1.判断是否满足事件触发条件
        if (ruleParam.getTriggerParam().getEventId().equals(logBean.getEventId())) {
            log.debug("{}规则被触发，触发人:{},触发事件:{},触发时间:{}", ruleParam.getRuleId(), logBean.getDeviceId(), logBean.getEventId(), logBean.getTimeStamp());

            //2.查询画像条件
            boolean b = queryRouterServiceV4.userProfileQuery(logBean, ruleParam);
            if (!b) return;

            //3.查询用户行为事件序列条件
            log.debug("规则：{},用户：{} 查询用户行为事件序列条件", ruleParam.getRuleId(), logBean.getDeviceId());
            boolean b2 = queryRouterServiceV4.userActionSequenceQuery(logBean, ruleParam);
            if (!b2) return;
            log.debug("规则：{},用户：{} 用户行为事件序列条件匹配！！！", ruleParam.getRuleId(), logBean.getDeviceId());

            //4.查询用户行为次数条件
            log.debug("规则：{},用户：{} 查询用户行为次数条件", ruleParam.getRuleId(), logBean.getDeviceId());
            boolean b1 = queryRouterServiceV4.userActionCountQuery(logBean, ruleParam);
            if (!b1) return;
            log.debug("规则：{},用户：{} 用户行为次数条件匹配！！！", ruleParam.getRuleId(), logBean.getDeviceId());

            log.info("{}规则，触发人:{},计算匹配成功", ruleParam.getRuleId(), logBean.getDeviceId());
            ResultBean resultBean = new ResultBean();
            resultBean.setTimeStamp(logBean.getTimeStamp());
            resultBean.setDeviceId(logBean.getDeviceId());
            resultBean.setRuleId(ruleParam.getRuleId());
            //收集如何规则匹配成功的结果
            collector.collect(resultBean);
        }
    }
}
